RxJava2-线程管理和流程浅析

介绍

承接上文,结合使用场景,讨论一下如何告别AsyncTask,就是因为RxJava的强大线程管理功能。

举例说明

认识RxJava之前,我们处理异步任务的方式主要有两种:

  1. AsyncTask
  2. Thread + Runnable。

涉及的代码量相比较RxJava而言大太多,针对Handler处理不好,可能存在内存泄漏的风险。不赘述,看看如何使用RxJava处理异步任务。

异步处理

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Logger("Emit 111");
e.onNext(111);
Logger("Emit 222");
e.onNext(222);
Logger("Emit onComplete");
e.onComplete();

}
});

Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Logger("onSubscribe");
}

@Override
public void onNext(Integer integer) {
Logger("onNext integer = " + integer);
}

@Override
public void onError(Throwable e) {
Logger("onError e = " + e.getMessage());
}

@Override
public void onComplete() {
Logger("onComplete");
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

运行结果

这里写图片描述
可以清楚的看到,Observer下的操作是在主线程下完成的,而Observable下发射器的发射动作却是在一个新的线程中完成的。通过这种操作,我们可以在subscribe方法中执行耗时操作,然后结果通过onNext()方法返回给主线程,实现异步处理的目的。
常用的场景:访问数据库,网络请求数据,后台计算操作等等。

Schedulers 和 AndroidSchedulers

AndroidSchedulers是RxAndroid中的线程调度器,主要用途如上所示,AndroidSchedulers.mainThread,代表Android中的主线程(UI线程)。

方法 解释
Schedulers.computation() 用于计算任务
Schedulers.from(Executor executor) 使用指定的Executor作为调度器
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长
Scheduler.newThread() 为每个任务创建一个新线程
Scheduler.shutdown() 停止调度器
Scheduler.single() 单独线程
Scheduler.start() 启动调度器
Scheduler.trampoline() 在当前线程中,但是会等到当前线程任务执行完毕之后再去执行
AndroidScheduler.mainThread() 主线程

源码

Observable.create()

1
2
3
4
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

先执行非空检查,然后通过ObservableCreate来创建Observable。而ObservableCreate继承Observable。看下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//ObservableCreate继承Observable
public final class ObservableCreate<T> extends Observable<T> {
//ObservableOnSubscribe接口只有一个subscribe方法
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
//赋值,结合上面Observable的create方法,这个source应该是我们new出来的ObservableOnSubscribe
this.source = source;
}

//这个方法在Observable执行subscribe(Observer)的时候使用到
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建CreateEmitter,传入observer,内部使用
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//执行observer的onSubscribe方法,parent是CreateEmitter,实现了Disposable,和我们创建Observer时实现的onSubscribe方法一致,没毛病
observer.onSubscribe(parent);

try {
//执行subscribe方法,source为ObservableOnSubscribe对象,parent为CreateEmitter,而CreateEmitter实现ObservableEmitter接口,没毛病
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//CreateEmitter执行onError方法
parent.onError(ex);
}
}

//CreateEmitter类继承AtomicReference<Disposable>实现ObservableEmitter和Disposable 接口
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


private static final long serialVersionUID = -3434801548987643227L;

//创建过程中传入的Observer
final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}

//OnNext方法
@Override
public void onNext(T t) {
//非空检查,onNext在2.0之后不允许传入null值作为参数
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//这个对应上我们的上一篇博客,一次性水管,如果isDisposed为true,则发射器发出的事件,将不会被观察者执行
if (!isDisposed()) {
observer.onNext(t);
}
}

//onError方法,当tryOnErro返回false的时候,执行RxJavaPlugins.onError(t),何时tryOnError会返回false呢?看下面
@Override
public void onError(Throwable t) {
//当isDisposed()为true后,会执行RxJavaPlugins.onError(t)操作,也就是说如果在isDisposed()为true的情况下,发射器还发出onError()事件,会导致程序崩溃。具体看下面的运行示例。
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
//也是不允许传入null
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
//如果isDisposed为false,执行观察者的onError方法,然后执行dispose()操作,也就是观察者不处理后面发射器发送的事件了。估计onComplete()方法中也会有类似的操作流程
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
//只有当isDisposed为true的时候回返回false,也就是上一个方法回执行RxJavaPlugins.onError(t);操作
return false;
}

@Override
public void onComplete() {
//和上面onError()操作类似,不同的是没有非空检查,因为onComplete没有参数。
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
……
}
……
//这部分介绍的是SerializedEmitter,暂无涉及
}

举例说明,isDisposed()为true时,发射器继续发送onError事件会导致程序崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Logger("Emit 111");
e.onNext(111);
Logger("Emit 222");
e.onNext(222);
Logger("Emit 333");
e.onNext(333);
Logger("Emit onError");
e.onError(new Throwable("Test Disposable onError"));

}
});

Observer<Integer> observer = new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Logger("onSubscribe");
mDisposable = d;
}

@Override
public void onNext(Integer integer) {
Logger("onNext integer = " + integer);
if(mDisposable!=null && !mDisposable.isDisposed() && integer == 222) {
mDisposable.dispose();
}
}

@Override
public void onError(Throwable e) {
Logger("onError e = " + e.getMessage());
}

@Override
public void onComplete() {
Logger("onComplete");
}
};
observable.subscribe(observer);

运行结果:
这里写图片描述

Observer

创建Observer,无甚特殊,注意onSubscribe,onNext,onError传入的参数不能为空以及Disposable 的使用。

1
2
3
4
5
6
7
8
9
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);

void onNext(@NonNull T t);

void onError(@NonNull Throwable e);

void onComplete();
}

observable.subscribe(observer)

分析一下subscribe方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void subscribe(Observer<? super T> observer) {
//observer非空检查
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//关联observable和observer
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//这个方法在Observable中是个抽象方法,但是结合上面Observerable的create过程,可以知道这里实际上调用的是ObservableCreate的subscribeActual方法,也就是上面我们分析的过程,没毛病
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

一路看下来,我们就可以很快的和我们做的测试对应上,先调用onSubscribe方法,然后执行subscribe方法中的发射器操作,根据发射器的操作Observer作出对应的处理。

subscribeOn

subscribeOn用来指定Observable在哪个线程执行自己的subscribe方法。

1
2
3
4
5
public final Observable<T> subscribeOn(Scheduler scheduler) {
//scheduler非空检查
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

创建了一个ObservableSubscribeOn,这个类千万别和上面创建Observable过程中使用的ObservableOnSubscribe接口弄混淆,结合当前操作为subscribeOn来记住这个类名。

1
2
3
4
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

Observerable继承自ObservableSource,所以创建ObservableSubscribeOn的时候Observable和scheduler传递过来。
ObservableSubscribeOn继承
AbstractObservableWithUpstream,而后者又继承Observable。所以实际上经过subscribeOn操作之后,后续操作的对象从Observerable变成了ObservableSubscribeOn,所以,当后面执行subscribe时执行的subscribeActual方法为ObservableSubscribeOn重的对应方法

1
2
3
4
5
6
7
8
9
10
11
12
    @Override
public void subscribeActual(final Observer<? super T> s) {
//封装Observer,实际Observer由其内部actual维护
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//Observer执行onSubscribe方法,SubscribeOnObserver实现Disposable接口,所以上面的例子中onSubscribe传递的是Disposable类型
s.onSubscribe(parent);
//这里有3个操作:
//1. 创建SubscribeTask,传入封装后的Observer
//2. 调度器执行scheduleDirect操作
//3. 封装后的Observer执行setDisposable操作
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

关注scheduler.scheduleDirect(new SubscribeTask(parent)),先看下

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

实现Runnable,终于看到一个有点熟悉的东西。传入的parent为封装后的Observer。而source则是创建ObservableSubscribeOn过程中传入的Observable。
在看scheduleDirect方法之前,我们得先弄清楚这个scheduler是个什么东西?在Schedulers类中,不同的Scheduler已经初始化完成。

1
2
3
4
5
6
7
8
9
10
11
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

就看下NEW_THREAD 这个,首先new NewThreadTask()

1
2
3
4
5
6
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
1
2
3
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final class NewThreadScheduler extends Scheduler {

final ThreadFactory threadFactory;

private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}

public NewThreadScheduler() {
this(THREAD_FACTORY);
}

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}

看到了ThreadFactory,RxThreadFactory,而RxThreadFactory实现了ThreadFactory接口,所以最后还是线程的使用,只是RxJava对这些基础的东西做了深度的封装和流程上的优化,让我们更方便的使用。
回溯到上面的scheduleDirect方法,

1
2
3
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//创建工作线程,以NewThreadScheduler为例,是创建NewThreadWorker
final Worker w = createWorker();
//
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

//封装出一个带Dispose的Task,方便控制
DisposeTask task = new DisposeTask(decoratedRun, w);
//以NewThreadScheduler为例,这里执行的是NewThreadWorker中的schedule方法
w.schedule(task, delay, unit);

return task;
}
1
2
3
4
5
6
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}

忽略disposed的影响,最后执行到scheduleActual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
//直接提交或者进入队列
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

可以看到熟悉的ScheduledExecutorService和Future。
所以,综上所述,其内部也是新创建一个线程,结合Runnable。

observeOn

原理我觉得和subscribeOn没有太大的差别。不做赘述。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×